
您所在的位置:网站首页 rabbitmq 清空queue RabbitMQ消息队列官方教程Java学习笔记


2022-06-10 06:22| 来源: 网络整理| 查看: 265

import com.rabbitmq.client.*;

public class RPCServer {

private static final String RPC_QUEUE_NAME = "rpc_queue";

private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); }

public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");

try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.queuePurge(RPC_QUEUE_NAME);

//设置n个Server进程 channel.basicQos(1);

System.out.println(" [x] Awaiting RPC requests");

Object monitor = new Object(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(delivery.getProperties().getCorrelationId()) .build();

String response = "";

try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message);

System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized (monitor) { monitor.notify(); } } };

channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { })); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized (monitor) { try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }}




CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3